package com.it.cloud.producer.partition;

import com.it.cloud.producer.dto.MessageInfo;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Random;

/**
 * 生产者--自定义分区
 */
public class PartitionorProducer {


    /**
     * 消息
     *
     * @return
     */
    private static MessageInfo createMessageInfo() {
        // 随机产生1到10之间的整数
        Random r = new Random();
        int id = r.nextInt(10);
        MessageInfo info = new MessageInfo();
        info.setId(String.valueOf(id));
        info.setMsg("这里是消息");
        return info;
    }

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 设置自定义分区
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.it.cloud.producer.partition.MyPartitionor");
        // 生产者
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        try {
            MessageInfo info = createMessageInfo();
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("my-topic", info.getId(), info.getMsg());
            producer.send(record);

            Thread.sleep(2000);
        } catch (Exception e) {
            System.out.println("Send message exception" + e);
        } finally {
            producer.close();
        }
    }

}
